This document is used to describe the use of the provided task files, as well as the use and updating of the included modules and related functions.
It is also possible to create custom tasks and distribute each sub-task to each connected Mega mBots and later reconstruct the result of each sub-task. To do this, there two steps:
1.) On the client side, create a function task in a new Python file. This file is used for each Mega mBot to compute the sub-task distributed to it. Hence, it must contain only one parameter, which is used to pass the data assigned to it. Once the computation is done, it must return the result, which will later be collected and reconstructed on the client (a.k.a. the master node). Similar to the run method inside the Mobility class, if you want to import any module, you must import inside this task function. Here is example of computing uncoded convolution:
def task(data): import numpy as np s = data[0] A = data[1:1+s] X = data[1+s:] rs = np.convolve(A, X) return rs
2.) On the client side again (which is also the master node), you should edit the code on ./master.py
for data decomposition, result reconstruction, and ways to get useful information of the task. This means you must have at least three functions: data_decompose, reconstruct and get_information. The first function data_decompose is used to decompose data into small piece and prepare to send to each connect Mega mBot, and it has two parameters. The number of connected Mega mBots passed by the first parameter, and the state (communication and computation latency) per Mega mBot is passed by the second parameter. The second function reconstruct is used to reconstruct results return by Mega mBots. Thus, it has only one parameter, which is used to pass the results. The get_information function is used to return useful information of the task (e.g., size of A), and it is okay to return None
if the needed information is pre-stored in data_decompose and reconstruct. The following is the example of data decomposition and result reconstruction for uncoded convolution:
import math import numpy as np def data_decompose(proc_num, state = None): size_of_A, size_of_X = (40000, 20000) s = int(math.sqrt((size_of_A * size_of_X) / proc_num)) np.random.seed(35) A = np.random.randint(1000, size=size_of_A).tolist() X = np.random.randint(1000, size=size_of_X).tolist() iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) data_list = [] for i in range(0, iter1): for j in range(0, iter2): data = [s] data += A[i * s:(i + 1) * s] data += X[j * s:(j + 1) * s] data_list.append(data) return data_list def reconstruct(data_list): proc_num = len(data_list) size_of_A, size_of_X = (40000, 20000) s = int(math.sqrt((size_of_A * size_of_X) / proc_num)) iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) n_A = s + size_of_X - 1 n_Z = len(data_list[0]) result = [] for i in range(0, iter1): data = [] partial_result = [0 for i in range(0, n_A)] for j in range(0, iter2): padding_left = s * j padding_right = n_A - n_Z - s * j data_temp = data_list[i*iter2+j] data_temp = np.pad(data_temp, (padding_left, padding_right), 'constant') data.append(data_temp) for d in data: partial_result = np.add(partial_result, d) result.append(partial_result) n = size_of_A + size_of_X - 1 n_Z = len(result[0]) data = [] final_result = [0 for i in range(0, n)] for j in range(0, len(result)): padding_left = s * j padding_right = n - n_Z - s * j data_temp = np.pad(result[j], (padding_left, padding_right), 'constant') data.append(data_temp) for d in data: final_result = np.add(final_result, d) # return final_result return np.round(final_result).astype(int) def get_information(): pass
Note: If the data returned by the data_decompose function is in numpy array format, be sure to convert it to a list through .tolist()
.
./task/uncoded1.py
. ./master.py
. $ python3 client.py Welcome to use Mega mBot control client.1). Control all Mega mBots in this network 2). Control a single Mega mBot in this network
Select from 1 to 2: 1 Now scan all available Mega mBots... 2022-03-15 15:12:39: Start scanning... - 192.168.1.114:8085 is available - 192.168.1.150:8085 is available
2022-03-15 15:13:05: Scan completed. Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.150:8085 Connected to all available Mega mBots. Instruction: task Task file location: task/uncoded1.py - ins to 192.168.1.114: task - ins to 192.168.1.150: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 3.90215s <<
The idea of uncoded convolution can be learned from the paper - Coded convolution for parallel and distributed computing within a deadline. In this section we will demonstrate how to use the Mega mBot control system to compute uncoded convolutions when connecting two Mega mBots.
1.) Copy-paste the function task from the first step of the Custom Tasks section and save it to a new Python file ./task/uncoded1.py
.
2.) Copy-paste all code from the second step of the Custom Tasks section to the Python file ./master.py
.
3.) On the client side, run:
$ python3 client.py Welcome to use Mega mBot control client.1). Control all Mega mBots in this network 2). Control a single Mega mBot in this network
Select from 1 to 2: 1 Now scan all available Mega mBots... 2022-03-15 15:12:39: Start scanning... - 192.168.1.114:8085 is available - 192.168.1.150:8085 is available
2022-03-15 15:13:05: Scan completed. Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.150:8085 Connected to all available Mega mBots. Instruction: task Task file location: task/uncoded1.py - ins to 192.168.1.114: task - ins to 192.168.1.150: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 3.90215s <<
The idea of coded convolution can be learned from the paper - Coded convolution for parallel and distributed computing within a deadline. In this section we will demonstrate how to use the Mega mBot control system to compute coded convolutions when connecting two Mega mBots.
1.) Copy-paste the following code and save it to a new Python file ./task/prop.py
.
def task(data): import numpy as np s = int(data[0]) j = int(data[1]) index = int(data[2]) A = data[3:3+s] X = data[3+s:] rs = np.convolve(A, X) rs = np.concatenate((j, index, rs), axis=None) return rs
2.) Copy-paste all code from the following code to the Python file ./master.py
.
import math import numpy as np _proc_num = 0 def data_decompose(proc_num, state = None): global _proc_num _proc_num = proc_num info = get_information() size_of_A, size_of_X = info['size_of_A'], info['size_of_X'] n, s = info['n'], info['s'] np.random.seed(35) A = np.random.randint(1000, size=size_of_A).tolist() X = np.random.randint(1000, size=size_of_X).tolist() iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) G = generator(n, iter1) a_reshape = np.reshape(A, (iter1, s)) a_hat = np.matmul(G, a_reshape) a_hat = a_hat.tolist() for i in range(0, n): # include index for building decoded matrix a_hat[i] = np.concatenate((i, a_hat[i]), axis=None).tolist() data_list = [] for i in range(0, n): for j in range(0, iter2): data = [s, j] data += a_hat[i] data += X[j * s:(j + 1) * s] data_list.append(data) return data_list def reconstruct(data_list): info = get_information() size_of_A, size_of_X = info['size_of_A'], info['size_of_X'] n, s = info['n'], info['s'] iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) G = generator(n, iter1) # organize data_list collections = [[] for i in range(0, iter2)] for data in data_list: j = int(data[0]) rs = data[1:] collections[j].append(rs) # reconstruct results collection_2 = [] for j in range(0, iter2): collection = collections[j] indexes = [] results_partial = [] G_partial = np.array([]) for result_slice in collection: if (len(indexes) == iter1): break index = int(result_slice[0]) results_partial.append(result_slice[1:]); indexes.append(index) for index in indexes: G_partial = np.append(G_partial, G[index]) G_partial = np.reshape(G_partial, (len(indexes), iter1)) G_1 = np.linalg.inv(G_partial) result = np.matmul(G_1, results_partial) for (i, result_slice) in enumerate(result): # padding with 0 for a a_total_padding = (len(result) - 1) * s a_left = i * s a_right = a_total_padding - a_left result_slice = np.pad(result_slice, (a_left, a_right), 'constant') # padding with 0 for x x_total_padding = size_of_A + size_of_X - 1 - len(result_slice) x_left = j * s x_right = x_total_padding - x_left result_slice = np.pad(result_slice, (x_left, x_right), 'constant') collection_2.append(result_slice) result = np.zeros(size_of_A + size_of_X - 1) for result_slice in collection_2: result = np.add(result, result_slice) # return final_result return np.round(result).astype(int) def get_information(): size_of_A, size_of_X = (40000, 20000) proc_num = _proc_num s = int(math.sqrt((size_of_A * size_of_X) / proc_num)) + 1 iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) while proc_num != iter1 * iter2 and size_of_A % s != 0 and size_of_X % s != 0: s = s + 1 iter1 = int(size_of_A / s) iter2 = int(size_of_X / s) if s > size_of_A or s > size_of_X: raise ValueError('Please check for the size of A, the size of X and the number of nodes.') n = int((proc_num * s) / size_of_X) return { 'size_of_A': size_of_A, 'size_of_X': size_of_X, 'n': n, 's': s } def generator(n, columns): start_at = 0 # step = 0.0065555 step = 0.15 end_at = start_at + step * n x = np.arange(start_at, end_at, step) G = np.vander(x, columns) G_t = np.fliplr(G) return G_t def spliter(data): return [data[0], data[1:]]
3.) On the client side, run:
$ python3 client.py Welcome to use Mega mBot control client.It should be noted that due to the lack of Mega mBots, we were running 81). Control all Mega mBots in this network 2). Control a single Mega mBot in this network
Select from 1 to 2: 1 Now scan all available Mega mBots... Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.114:8081 Trying to Connect to 192.168.1.114:8082 Trying to Connect to 192.168.1.114:8083 Trying to Connect to 192.168.1.114:8084 Trying to Connect to 192.168.1.114:8086 Trying to Connect to 192.168.1.114:8088 Trying to Connect to 192.168.1.114:8087 Connected to all available Mega mBots. Instruction: task Task file location: task/coded.py - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 8.25399s <<
server.py
(with different ports) in the same Mega mBot, so the performance of distributed computing is not reflected in this example.
The idea of smooth-turn mobility model for airborne networks can be learned from the paper - A Smooth-Turn Mobility Model for Airborne Networks. The Matlab code provided by this paper, can be downloaded here. The following is the smooth-turn mobility control implement in the Mega mBot Control System:
class Mobility: def __init__(self, car): self.car = car self.V = 500 # Speed self.phi = 0 # Heading angle (radian) self.delta_T = 6 # Unit time step self.reflection = False def run(self): import time expire_after = 60 expired_time = time.time() + expire_after while(1): if time.time() > expired_time: break theta = self.generate_theta() self.turn_with_angle(theta, self.generate_interval()) self.car.move_stop() time.sleep(0.2) print("mega mBot stop!") def generate_theta(self, R = None): if R == None: R = self.generate_radius() W = self.V / R theta = W * self.delta_T return theta def generate_radius(self): import numpy as np varian=3.105e-5 # Variance of the Gaussian variable, determining the preference between straight trajectories and turns R_d_1 = np.random.normal(0, varian) R = 1/R_d_1 return R def generate_interval(self): import numpy as np ExponentialMean = 100 # Mean duration between the changes of turning centers interval = np.random.exponential(ExponentialMean) / self.delta_T interval = max(3, interval) interval = min(60, interval) return interval def turn_with_angle(self, theta, duration, time_step = 0.3, show_msg = True): import time angle = abs(theta) forward_speed = 50 turn_speed = forward_speed * angle msg = "Turn left with:\n" if theta > 0 else "Turn right with:\n" msg += " - forward_speed: " + str(forward_speed) + "\n" msg += " - turn_speed: " + str(turn_speed) + "\n" msg += " - theta: " + str(theta) + "\n" msg += " - duration: " + str(duration) + "\n" if show_msg: print(msg) stop_time = time.time() + duration while time.time() < stop_time: if theta > 0: self._turn_left(forward_speed, turn_speed, time_step / 2) else: self._turn_right(forward_speed, turn_speed, time_step / 2) # relection if reach to the boundary if self._reflex(): self.reflection = not self.reflection self.car.LED(1, "74aacc") self.car.LED(2, "74aacc") else: self.car.LED_reset(1) self.car.LED_reset(2) def _turn_left(self, forward_speed, left_speed, time_step = 0.2): from time import sleep if self.reflection == False: self.car.move_forward(forward_speed) else: self.car.move_backward(forward_speed) sleep(time_step) self.car.move_left(left_speed) sleep(time_step) def _turn_right(self, forward_speed, right_speed, time_step = 0.2): from time import sleep if self.reflection == False: self.car.move_forward(forward_speed) else: self.car.move_backward(forward_speed) sleep(time_step) self.car.move_right(right_speed) sleep(time_step) def _reflex(self): IR1 = self.car.is_sensing('IR1') IR2 = self.car.is_sensing('IR2') IR3 = self.car.is_sensing('IR3') IS1 = self.car.is_sensing('IS1') IS2 = self.car.is_sensing('IS2') if IS1 or IS2 or IR1 or IR2 or IR3: return True return False
The idea of smooth-turn mobility model for airborne networks can be learned from the paper - A Smooth-Turn Mobility Model for Airborne Networks. The Matlab code provided by this paper, can be downloaded here. The following is the smooth-turn mobility control implement in the Mega mBot Control System:
class Mobility: def __init__(self, car): self.car = car self.V = 500 # Speed self.phi = 0 # Heading angle (radian) self.delta_T = 6 # Unit time step self.reflection = False def run(self): import time expire_after = 60 expired_time = time.time() + expire_after while(1): if time.time() > expired_time: break theta = self.generate_theta() self.turn_with_angle(theta, self.generate_interval()) self.car.move_stop() time.sleep(0.2) print("mega mBot stop!") def generate_theta(self, R = None): if R == None: R = self.generate_radius() W = self.V / R theta = W * self.delta_T return theta def generate_radius(self): import numpy as np varian=3.105e-5 # Variance of the Gaussian variable, determining the preference between straight trajectories and turns R_d_1 = np.random.normal(0, varian) R = 1/R_d_1 return R def generate_interval(self): import numpy as np ExponentialMean = 100 # Mean duration between the changes of turning centers interval = np.random.exponential(ExponentialMean) / self.delta_T interval = max(3, interval) interval = min(60, interval) return interval def turn_with_angle(self, theta, duration, time_step = 0.3, show_msg = True): import time angle = abs(theta) forward_speed = 50 turn_speed = forward_speed * angle msg = "Turn left with:\n" if theta > 0 else "Turn right with:\n" msg += " - forward_speed: " + str(forward_speed) + "\n" msg += " - turn_speed: " + str(turn_speed) + "\n" msg += " - theta: " + str(theta) + "\n" msg += " - duration: " + str(duration) + "\n" if show_msg: print(msg) stop_time = time.time() + duration while time.time() < stop_time: if theta > 0: self._turn_left(forward_speed, turn_speed, time_step / 2) else: self._turn_right(forward_speed, turn_speed, time_step / 2) # relection if reach to the boundary if self._reflex(): self.reflection = not self.reflection self.car.LED(1, "74aacc") self.car.LED(2, "74aacc") else: self.car.LED_reset(1) self.car.LED_reset(2) def _turn_left(self, forward_speed, left_speed, time_step = 0.2): from time import sleep if self.reflection == False: self.car.move_forward(forward_speed) else: self.car.move_backward(forward_speed) sleep(time_step) self.car.move_left(left_speed) sleep(time_step) def _turn_right(self, forward_speed, right_speed, time_step = 0.2): from time import sleep if self.reflection == False: self.car.move_forward(forward_speed) else: self.car.move_backward(forward_speed) sleep(time_step) self.car.move_right(right_speed) sleep(time_step) def _reflex(self): IR1 = self.car.is_sensing('IR1') IR2 = self.car.is_sensing('IR2') IR3 = self.car.is_sensing('IR3') IS1 = self.car.is_sensing('IS1') IS2 = self.car.is_sensing('IS2') if IS1 or IS2 or IR1 or IR2 or IR3: return True return False